package storage

import (
	"encoding/json"
	"fmt"
	"net/http"

	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
	metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	genericrequest "k8s.io/apiserver/pkg/endpoints/request"
	"k8s.io/apiserver/pkg/registry/rest"
	"k8s.io/klog/v2"

	clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
	searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme"
)

type errorResponse struct {
	Error string `json:"error"`
}

type reqResponse struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`

	Items []runtime.Object `json:"items"`
}

func (r *SearchREST) newCacheHandler(info *genericrequest.RequestInfo, responder rest.Responder) (http.Handler, error) {
	resourceGVR := schema.GroupVersionResource{
		Group:    info.APIGroup,
		Version:  info.APIVersion,
		Resource: info.Resource,
	}

	return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
		enc := json.NewEncoder(rw)

		opts := metainternalversion.ListOptions{}
		if err := searchscheme.ParameterCodec.DecodeParameters(req.URL.Query(), metav1.SchemeGroupVersion, &opts); err != nil {
			rw.WriteHeader(http.StatusBadRequest)
			klog.Errorf("Failed to decode parameters from req.URL.Query(): %v", err)
			_ = enc.Encode(errorResponse{Error: err.Error()})
			return
		}

		if errs := metainternalversionvalidation.ValidateListOptions(&opts); len(errs) > 0 {
			rw.WriteHeader(http.StatusBadRequest)
			klog.Errorf("Invalid decoded ListOptions: %v.", errs)
			_ = enc.Encode(errorResponse{Error: errs.ToAggregate().Error()})
			return
		}

		label := labels.Everything()
		if opts.LabelSelector != nil {
			label = opts.LabelSelector
		}

		clusters, err := r.clusterLister.List(labels.Everything())
		if err != nil {
			rw.WriteHeader(http.StatusInternalServerError)
			_ = enc.Encode(errorResponse{Error: fmt.Sprintf("Failed to list clusters: %v", err)})
			return
		}

		// TODO: process opts.Limit to prevent the client from being unable to process the response
		// due to the large size of the response body.
		objItems := r.getObjectItemsFromClusters(clusters, resourceGVR, info.Namespace, info.Name, label)
		kind, err := r.restMapper.KindFor(resourceGVR)
		if err != nil {
			rw.WriteHeader(http.StatusInternalServerError)
			klog.Errorf("Failed to find kind, resource: %s, %v", resourceGVR.Resource, err)
			_ = enc.Encode(errorResponse{Error: err.Error()})
			return
		}
		rr := reqResponse{
			TypeMeta: metav1.TypeMeta{
				APIVersion: resourceGVR.GroupVersion().String(),
				Kind:       kind.Kind + "List",
			},
			Items: objItems,
		}
		rw.Header().Set("Content-Type", "application/json; charset=utf-8")
		_ = enc.Encode(rr)
	}), nil
}

func (r *SearchREST) getObjectItemsFromClusters(
	clusters []*clusterv1alpha1.Cluster,
	objGVR schema.GroupVersionResource,
	namespace, name string,
	label labels.Selector) []runtime.Object {
	items := make([]runtime.Object, 0)

	// TODO: encapsulating the Interface for Obtaining resource from the Multi-Cluster Cache.
	for _, cluster := range clusters {
		singleClusterManger := r.multiClusterInformerManager.GetSingleClusterManager(cluster.Name)
		if singleClusterManger == nil {
			klog.Warningf("SingleClusterInformerManager for cluster(%s) is nil.", cluster.Name)
			continue
		}

		var err error
		objLister := singleClusterManger.Lister(objGVR)
		if len(name) > 0 {
			var resourceObject runtime.Object
			if len(namespace) > 0 {
				resourceObject, err = objLister.ByNamespace(namespace).Get(name)
			} else {
				resourceObject, err = objLister.Get(name)
			}
			if err != nil {
				klog.Errorf("Failed to get %s resource(%s/%s) from cluster(%s)'s informer cache: %v",
					objGVR, namespace, name, cluster.Name, err)
				continue
			}
			items = append(items, addAnnotationWithClusterName([]runtime.Object{resourceObject}, cluster.Name)...)
		} else {
			var resourceObjects []runtime.Object
			if len(namespace) > 0 {
				resourceObjects, err = objLister.ByNamespace(namespace).List(label)
			} else {
				resourceObjects, err = objLister.List(label)
			}
			if err != nil {
				klog.Errorf("Failed to list %s resource from cluster(%s)'s informer cache: %v",
					objGVR, cluster.Name, err)
				continue
			}
			items = append(items, addAnnotationWithClusterName(resourceObjects, cluster.Name)...)
		}
	}

	return items
}

func addAnnotationWithClusterName(resourceObjects []runtime.Object, clusterName string) []runtime.Object {
	resources := make([]runtime.Object, 0)
	for index := range resourceObjects {
		resource := resourceObjects[index].(*unstructured.Unstructured)

		annotations := resource.GetAnnotations()
		if annotations == nil {
			annotations = make(map[string]string)
		}

		annotations[clusterv1alpha1.CacheSourceAnnotationKey] = clusterName

		resource.SetAnnotations(annotations)
		resources = append(resources, resource)
	}

	return resources
}
